#!/usr/bin/env perl

use warnings;

use English;
use FindBin;
use lib "$FindBin::Bin/../../lib";

use strict;
use DBDefs;
use integer;

use Getopt::Long;
use HTTP::Status qw(
    RC_OK
    RC_NOT_FOUND
    RC_NOT_MODIFIED
    is_server_error
);
use String::ShellQuote qw( shell_quote );

use MusicBrainz::Server::Replication qw(
    :replication_type
    REPLICATION_ACCESS_URI
);
use MusicBrainz::Server::Replication::Packet qw(
    decompress_packet
    retrieve_remote_file
);

my $fHelp;
my $fProgress = -t STDOUT;
my $tmpdir = '/tmp';
my $fKeepFiles = 0;
my $limit = 0;
my $count = 0;
my $baseuri = REPLICATION_ACCESS_URI;
my $lockfile = my $deflockfile = '/tmp/.mb-LoadReplicationChanges';
my @process_opts;
my $fVerifySig = 0;
my $database = 'READWRITE';
my $dbmirror2 = DBDefs->REPLICATION_USE_DBMIRROR2;

GetOptions(
    'help'                      => \$fHelp,
    'lockfile=s'                => \$lockfile,
    'base-uri=s'                => \$baseuri,
    'dbmirror2!'                => \$dbmirror2,
    'process-arg=s'             => \@process_opts,
    'limit=i'                   => \$limit,
    'verify-sig'                => \$fVerifySig,
    'database=s'                => \$database,
) or exit 2;

(DBDefs->REPLICATION_TYPE == RT_MIRROR || $database =~ /^TEST/)
    or die "This is not a mirror server!\n";

if ($database ne 'READWRITE') {
    push @process_opts, '--database=' . shell_quote($database);
}

$baseuri =~ s/\/$//;

sub usage
{
    print <<EOF;
Usage: LoadReplicationChanges [options]

    --help             show this help
    --lockfile=FILE    use FILE as the lock file to prevent us running in
                       parallel (default: $deflockfile)
    --base-uri=URI     load the replication data from this location
                       (default: from metabrainz.org via HTTP)
    --process-arg=ARG  Add ARG to each invocation of ProcessReplicationChanges.
                       If required, ARG can be an option (e.g.
                       --process-arg=--debug-xact).  Add --process-arg=ARG
                       again to specify additional arguments.
    --limit=N          apply only N packets
    --verify-sig       If this option is present the replication packets will
                       be verified against their signature.
    --database         database to load packets into (default: READWRITE)

EOF
}

usage(), exit if $fHelp;
usage(), exit 2 if @ARGV;

if ($baseuri eq REPLICATION_ACCESS_URI) {
    my $token = DBDefs->REPLICATION_ACCESS_TOKEN;
    die 'Invalid or missing REPLICATION_ACCESS_TOKEN in DBDefs.pm -- get one at https://metabrainz.org'
        unless $token =~ /^[a-zA-Z0-9]{40}$/;
}

$SIG{'INT'} = sub { die "SIGINT\n" };

# Get a lock so we don't run multiple instances in parallel
my $lockfh;
{
    use Fcntl qw( LOCK_EX LOCK_NB O_CREAT O_WRONLY );
    sysopen($lockfh, $lockfile, O_CREAT|O_WRONLY, 0600)
        or die "open >$lockfile: $OS_ERROR";
    flock($lockfh, LOCK_EX|LOCK_NB)
        and last;
    $OS_ERROR{EWOULDBLOCK}
        or die "flock $lockfile: $OS_ERROR";
    print localtime() . " : There is already an instance of LoadReplicationChanges running - aborting\n";
    exit;
}

use MusicBrainz::Server::Context;
my $c = MusicBrainz::Server::Context->create_script_context(database => $database);

use Sql;
my $sql = Sql->new($c->conn);
my $dbh = $c->dbh;

# Retrieve current_schema_sequence and current_replication_sequence

my ($iSchemaSequence, $iReplicationSequence) = do {
    my $row = $sql->select_single_row_hash('SELECT * FROM replication_control');
    $row ||= {};
    @$row{qw(
        current_schema_sequence
        current_replication_sequence
    )};
};

unless ($iSchemaSequence == DBDefs->DB_SCHEMA_SEQUENCE)
{
    printf STDERR "%s : Schema sequence mismatch - codebase is %d, database is %d\n",
        scalar localtime,
        DBDefs->DB_SCHEMA_SEQUENCE,
        $iSchemaSequence,
        ;
    exit 1;
}

unless (defined $iReplicationSequence)
{
    print localtime() . ' : This database does not correspond to any replication sequence'
        . " - you cannot update this database using replication\n"
        if not $iReplicationSequence;
    exit 1;
}

check_foreign_keys();

my @pending_files;
if ($dbmirror2) {
    push @pending_files, qw(
        pending_data
        pending_keys
        pending_ts
    );
} else {
    push @pending_files, qw(
        dbmirror_pending
        dbmirror_pendingdata
    );
}

my %known_broken_sequences = (
    # MBS-9366
    '104949' => 1,
    # MBS-13333
    '162718' => 1,
    '162719' => 1,
    '162720' => 1,
    '162721' => 1,
    '162722' => 1,
    '162723' => 1,
    '162724' => 1,
);
my $aborted_sequence;

if (
    $dbmirror2
        ? $sql->select_single_value('SELECT count(*) FROM dbmirror2.pending_data')
        : $sql->select_single_value('SELECT count(*) FROM dbmirror_pending')
) {
    $aborted_sequence = $iReplicationSequence + 1;
    if ($known_broken_sequences{$aborted_sequence}) {
        print localtime() . ' : Looks like we may have previously tried ' .
            "to apply packet #$aborted_sequence, which is known to be " .
            "broken, and failed. Retrying in case the packet was fixed.\n";
        $sql->auto_commit(1);
        $sql->do('TRUNCATE dbmirror_pending CASCADE');
        $sql->auto_commit(1);
        $sql->do('TRUNCATE dbmirror_pendingdata CASCADE');
        $sql->auto_commit(1);
        $sql->do('TRUNCATE dbmirror2.pending_data CASCADE');
        $sql->auto_commit(1);
        $sql->do('TRUNCATE dbmirror2.pending_keys CASCADE');
        $sql->auto_commit(1);
        $sql->do('TRUNCATE dbmirror2.pending_ts CASCADE');
    } else {
        print localtime() . " : Continuing a previously aborted load\n";
        goto APPLY_CHANGES;
    }
}

NEXT_PACKET:

# Download next replication packet (current_replication_sequence+1)

my $iNextReplicationSequence = $iReplicationSequence + 1;
my $file = "replication-$iNextReplicationSequence.tar.bz2";
if ($dbmirror2) {
    $file = "replication-$iNextReplicationSequence-v2.tar.bz2";
}

my $url = "$baseuri/$file";
my $localfile = "$tmpdir/$file";

my $packet_retrieval_attempts = 0;
while (1) {
    my $resp = retrieve_remote_file($url, $localfile, $fVerifySig);
    my $code = $resp->code;
    if ($code == RC_OK || $code == RC_NOT_MODIFIED) {
        last;
    } elsif ($code == RC_NOT_FOUND) {
        # TODO check for newer replication packets, in case the server has lost one?
        # die if so

        # Otherwise exit ok
        print localtime() . " : Replication packet #$iNextReplicationSequence not available\n";
        exit 0;
    } else {
        print localtime() . " : Error retrieving $url:\n";
        print $resp->as_string;
        # Retry up to 3 times on server errors.
        if (is_server_error($code) && (++$packet_retrieval_attempts) <= 3) {
            print localtime() . " : Retrying in 10 seconds...\n";
            sleep 10;
            next;
        }
        die;
    }
}

# We successfully retrieved the replication packet.
# Decompress it to a temporary directory

$SIG{'INT'} = sub { exit 3 };

my $mydir = decompress_packet(
    'loadrep-XXXXXX',
    $tmpdir,
    $localfile,
    not $fKeepFiles
);

# Read SCHEMA_SEQUENCE.  Check it matches the current_schema_sequence
my $SCHEMA_SEQUENCE = read_file('SCHEMA_SEQUENCE');
unless (defined($SCHEMA_SEQUENCE) and $SCHEMA_SEQUENCE =~ /\A(\d+)\n\z/)
{
    print localtime() . " : SCHEMA_SEQUENCE file missing or malformed\n";
    exit 1;
}
$SCHEMA_SEQUENCE = $1;

unless ($SCHEMA_SEQUENCE == $iSchemaSequence)
{
    printf '%s : This replication packet matches schema sequence #%d'
        . ", but the database is currently at #%d\n",
        scalar localtime,
        $SCHEMA_SEQUENCE,
        $iSchemaSequence,
        ;
    print localtime() . " : You must upgrade your database in order to apply this replication packet\n";
    # See NOTE-SEQM-1 in Script::JSONDump::Incremental
    exit 3;
}

# Read REPLICATION_SEQUENCE.  Check that it matches the one we thought we'd
# downloaded.

my $REPLICATION_SEQUENCE = read_file('REPLICATION_SEQUENCE');
unless (defined($REPLICATION_SEQUENCE) and $REPLICATION_SEQUENCE =~ /\A(\d+)\n\z/)
{
    print localtime() . " : REPLICATION_SEQUENCE file missing or malformed\n";
    exit 1;
}
$REPLICATION_SEQUENCE = $1;

unless ($REPLICATION_SEQUENCE == $iNextReplicationSequence)
{
    print localtime() . " : Oops!  Downloaded packet $file"
        . ", but instead of being sequence #$iNextReplicationSequence"
        . " it's actually #$REPLICATION_SEQUENCE!\n";
    print localtime() . " : Please tell support\@musicbrainz.org!\n";
    exit 1;
}

# Read and show TIMESTAMP

my $TIMESTAMP = read_file('TIMESTAMP');
unless (defined($TIMESTAMP) and $TIMESTAMP =~ /\A(.*)\n\z/)
{
    print localtime() . " : TIMESTAMP file missing or malformed, ignoring\n";
} else {
    chomp($TIMESTAMP = $1);
    print localtime() . " : This packet was produced (or begins) at $TIMESTAMP\n";
}

my $DBMIRROR_VERSION = read_file('DBMIRROR_VERSION');
if (defined($DBMIRROR_VERSION) and $DBMIRROR_VERSION =~ /\A(\d+)\n\z/)
{
    $DBMIRROR_VERSION = $1;
} else {
    $DBMIRROR_VERSION = undef;
}

if ($dbmirror2 && !(defined $DBMIRROR_VERSION && $DBMIRROR_VERSION == 2)) {
    print localtime() . " : dbmirror2 is enabled, but DBMIRROR_VERSION != 2\n";
    exit 1;
}

# As this code may be executed multiple times via `goto`, the declaration
# must explicitly assign an empty list in order to clobber any previous
# value.
my @pending_file_paths = ();

# Check existence of the data files
for my $f (@pending_files)
{
    my $pending_file_path = "$mydir/mbdump/$f";
    if (-f $pending_file_path) {
        push @pending_file_paths, $pending_file_path;
    } elsif (
        # NOTE-NOPK-1: The pending_keys file isn't actually required to apply
        # dbmirror2 packets. If it's missing, we just query for primary keys
        # from the database instead. It *should* exist, but there are at
        # least three packets missing it (158338, 158339, 158340) due to a
        # mishap.
        $f ne 'pending_keys'
    ) {
        print localtime() . " : There is no mbdump/$f file in this archive!\n";
        print localtime() . " : Please tell support\@musicbrainz.org!\n";
        exit 1;
    }
}

# Load them
print localtime() . " : Importing the pending data files\n";
system "$FindBin::Bin/ImportReplicationChanges",
    '--database', $database,
    ($dbmirror2 ? '--dbmirror2' : '--nodbmirror2'),
    '--tmp-dir', $tmpdir,
    '--',
    @pending_file_paths;
exit $CHILD_ERROR if $CHILD_ERROR;

# Wipe the temporary directory
print localtime() . " : Removing $mydir\n";
use File::Path qw( rmtree );
rmtree($mydir);

# If we had some data leftover in the tables, we jump straight in here:
APPLY_CHANGES:

if (defined $aborted_sequence && $known_broken_sequences{$aborted_sequence}) {
    push @process_opts, '--ignore-conflicts';
}

# Apply the changes (ProcessReplicationChanges)
system "$FindBin::Bin/ProcessReplicationChanges", @process_opts,
    ($dbmirror2 ? '--dbmirror2' : '--nodbmirror2');
exit $CHILD_ERROR if $CHILD_ERROR;

if (-x "$FindBin::Bin/hooks/post-process") {
    print localtime() . " : Running post-process hook\n";
    system "$FindBin::Bin/hooks/post-process";
    exit $CHILD_ERROR if $CHILD_ERROR;
}

# Check that current_replication_sequence has gone up by one
# Also last_replication_date should match TIMESTAMP
{
    my $row = $sql->select_single_row_hash('SELECT * FROM replication_control');
    my $current_replication_sequence = $row->{current_replication_sequence};
    my $last_replication_date = $row->{last_replication_date};
    # If we're applying a previously aborted sequence, $REPLICATION_SEQUENCE
    # and $TIMESTAMP will not be defined.
    if (defined $aborted_sequence) {
        $aborted_sequence = undef;
        print
            localtime() .
            " : Successfully applied a previously aborted load; " .
            "current_replication_sequence is $current_replication_sequence and " .
            "last_replication_date is '$last_replication_date'.";
    } else {
        if ($current_replication_sequence != $REPLICATION_SEQUENCE) {
            die "Applied changes, but current_replication_sequence is $current_replication_sequence not $REPLICATION_SEQUENCE\n";
        }
        if (defined($TIMESTAMP) && $last_replication_date ne $TIMESTAMP) {
            warn "Applied changes, but last_replication_date is '$last_replication_date' not '$TIMESTAMP'\n";
        }
    }
    $iReplicationSequence = $current_replication_sequence;
}

$count += 1;
if ($limit > 0 && $count >= $limit) {
    print localtime() . " : Hit packet limit $limit.\n";
    exit 0;
}
# Loop back round and start again
goto NEXT_PACKET;

sub read_file
{
    my $file = shift;
    open(my $fh, "<$mydir/$file")
        or return undef;
    local $INPUT_RECORD_SEPARATOR;
    <$fh>;
}

# Check that there are no foreign key contraints which link non-replicated
# tables to replicated ones.  The existence of such contraints would almost
# certainly cause the processing of replication data to fail.

sub check_foreign_keys
{
    my %non_rep = ();

    my $fks = $sql->select_list_of_hashes(q{
        SELECT  c.conname, a.relname AS fk_table, b.relname AS pk_table
        FROM    pg_constraint c, pg_class a, pg_class b
        WHERE   a.oid = c.conrelid
        AND             b.oid = c.confrelid
        AND             contype = 'f'
    });

    my @bad = grep {
        $non_rep{ $_->{fk_table} } and not $non_rep{ $_->{pk_table} }
    } @$fks;

    for my $bad (@bad)
    {
        $bad->{fk_rows} = $sql->select_single_value(
                "SELECT COUNT(*) FROM $bad->{fk_table}",
        );
    }

    @bad = grep { $_->{fk_rows} } @bad;

    return unless @bad;

    print localtime() . " : Problem with foreign keys detected:\n";
    print <<EOF;

Your database includes foreign keys from non-replicated tables to replicated
ones, and the non-replicated tables are not empty.  This will almost certainly
cause replication processing to fail.  To load replication data, you'll need
to do one or both of the following:

EOF

    print "Remove the foreign keys:\n";
    printf "  ALTER TABLE %s DROP CONSTRAINT %s; -- refers to %s\n",
        $_->{fk_table}, $_->{conname}, $_->{pk_table},
        for @bad;
    print "\n";

    print "Empty the tables:\n";
    printf "  DELETE FROM %s;\n", $_->{fk_table} for @bad;
    print "\n";

    print localtime() . " : Replication abandoned\n";
    exit 1;
}

=head1 COPYRIGHT AND LICENSE

Copyright (C) 1998 Robert Kaye

This file is part of MusicBrainz, the open internet music database,
and is licensed under the GPL version 2, or (at your option) any
later version: http://www.gnu.org/licenses/gpl-2.0.txt

=cut
